package cn.itcast.es.utils;

import com.alibaba.fastjson.JSON;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.SuggestBuilders;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @author 虎哥
 */
public class ElasticUtils<T> {

    private final RestHighLevelClient highLevelClient;
    private final String indexName;

    public ElasticUtils(RestHighLevelClient highLevelClient, String indexName) {
        this.highLevelClient = highLevelClient;
        this.indexName = indexName;
    }

    public void createIndex(String source) throws IOException {
        // 1.创建一个Request对象
        CreateIndexRequest request = new CreateIndexRequest(indexName);
        // 2.给Request对象准备请求参数
        request.source(source, XContentType.JSON);
        // 3.发出请求
        highLevelClient.indices().create(request, RequestOptions.DEFAULT);
    }

    public void deleteIndex() throws IOException {
        // 1.创建一个Request对象
        DeleteIndexRequest request = new DeleteIndexRequest(indexName);
        // 2.发出请求
        highLevelClient.indices().delete(request, RequestOptions.DEFAULT);
    }

    public void addGoods(T t, Long id) throws IOException {
        // 1.创建一个Request对象
        IndexRequest request = new IndexRequest(indexName);
        // 2.给Request对象准备请求参数
        request.id(id.toString());
        request.source(JSON.toJSONString(t), XContentType.JSON);
        // 3.发出请求
        highLevelClient.index(request, RequestOptions.DEFAULT);
    }

    public void deleteById(Long id) throws IOException {
        // 1.创建一个Request对象
        DeleteRequest request = new DeleteRequest(indexName, id.toString());
        // 3.发出请求
        highLevelClient.delete(request, RequestOptions.DEFAULT);
    }

    public Mono<T> getById(Long id, Class<T> clazz) throws IOException {
        // 1.创建一个Request对象
        GetRequest request = new GetRequest(indexName, id.toString());
        return Mono.create(monoSink -> {
            // 2.发出请求
            highLevelClient.getAsync(request, RequestOptions.DEFAULT, new ActionListener<GetResponse>() {
                @Override
                public void onResponse(GetResponse response) {
                    // 数据反序列化
                    T t = JSON.parseObject(response.getSourceAsString(), clazz);
                    // 发射数据
                    monoSink.success(t);
                }

                @Override
                public void onFailure(Exception e) {
                    monoSink.error(e);
                }
            });
        });


    }

    public Flux<T> search(SearchSourceBuilder sourceBuilder, Class<T> clazz) throws IOException {
        // 1.创建一个Request对象
        SearchRequest request = new SearchRequest(indexName);
        // 2.给Request对象准备请求参数
        request.source(sourceBuilder);
        // 3.发出请求
        return Flux.create(fluxSink -> {
            highLevelClient.searchAsync(request, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
                @Override
                public void onResponse(SearchResponse response) {
                    // 4.解析结果
                    SearchHits searchHits = response.getHits();
                    // 4.1.total
                    long total = searchHits.getTotalHits().value;
                    // 4.2.数据
                    SearchHit[] hits = searchHits.getHits();
                    // 4.3.遍历
                    for (SearchHit hit : hits) {
                        // 获取source的json串
                        String json = hit.getSourceAsString();
                        // 反序列化为对象
                        T t = JSON.parseObject(json, clazz);

                        fluxSink.next(t);
                    }
                    fluxSink.complete();
                }

                @Override
                public void onFailure(Exception e) {
                    fluxSink.error(e);
                }
            });
        });

    }

    public List<String> suggest(String prefix, String field) throws IOException {
        // 1.准备Request对象
        SearchRequest request = new SearchRequest(indexName);

        // 2.准备请求参数
        // 2.1.创建SourceBuilder
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
        // 2.2.添加一个suggest查询
        sourceBuilder.suggest(
                new SuggestBuilder().addSuggestion("name_suggest",
                        SuggestBuilders.completionSuggestion(field).prefix(prefix)));
        request.source(sourceBuilder);

        // 3.发出请求
        SearchResponse response = highLevelClient.search(request, RequestOptions.DEFAULT);

        // 4.解析结果
        Suggest suggest = response.getSuggest();
        // 4.1.根据suggest的名称获取suggestion结果，注意结果要向下转型
        CompletionSuggestion suggestion = suggest.getSuggestion("name_suggest");
        // 4.2.获取补全的结果options
        List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions();
        // 4.3.遍历
        List<String> list = new ArrayList<>(options.size());
        for (CompletionSuggestion.Entry.Option option : options) {
            Text text = option.getText();
            list.add(text.toString());
        }
        return list;
    }

    public void close() throws IOException {
        this.highLevelClient.close();
    }
}
